In [1]:
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
# The code was removed by DSX for sharing.


Out[2]:
[Row(_c0=1, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'UNIVERSITY OF MICHIGAN', SOC_NAME=u'BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE=u'POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION=u'N', PREVAILING_WAGE=u'36067', YEAR=u'2016', WORKSITE=u'ANN ARBOR, MICHIGAN', lon=u'-83.7430378', lat=u'42.2808256'),
 Row(_c0=2, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'GOODMAN NETWORKS, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF OPERATING OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'242674', YEAR=u'2016', WORKSITE=u'PLANO, TEXAS', lon=u'-96.6988856', lat=u'33.0198431'),
 Row(_c0=3, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'PORTS AMERICA GROUP, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF PROCESS OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'193066', YEAR=u'2016', WORKSITE=u'JERSEY CITY, NEW JERSEY', lon=u'-74.0776417', lat=u'40.7281575'),
 Row(_c0=4, CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'GATES CORPORATION, A WHOLLY-OWNED SUBSIDIARY OF TOMKINS PLC', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'REGIONAL PRESIDEN, AMERICAS', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'220314', YEAR=u'2016', WORKSITE=u'DENVER, COLORADO', lon=u'-104.990251', lat=u'39.7392358'),
 Row(_c0=5, CASE_STATUS=u'WITHDRAWN', EMPLOYER_NAME=u'PEABODY INVESTMENTS CORP.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'PRESIDENT MONGOLIA AND INDIA', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'157518.4', YEAR=u'2016', WORKSITE=u'ST. LOUIS, MISSOURI', lon=u'-90.1994042', lat=u'38.6270025')]

In [3]:
df_data_1.describe()


Out[3]:
DataFrame[summary: string, _c0: string]

In [4]:
df_data_1.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)


In [5]:
sc


Out[5]:
<pyspark.context.SparkContext at 0x7fde33abfed0>

In [6]:
df_data_1.show()


+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|_c0|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|
+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
|  1|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|          36067|2016| ANN ARBOR, MICHIGAN| -83.7430378|42.2808256|
|  2|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|         242674|2016|        PLANO, TEXAS| -96.6988856|33.0198431|
|  3|CERTIFIED-WITHDRAWN|PORTS AMERICA GRO...|    CHIEF EXECUTIVES|CHIEF PROCESS OFF...|                 Y|         193066|2016|JERSEY CITY, NEW ...| -74.0776417|40.7281575|
|  4|CERTIFIED-WITHDRAWN|GATES CORPORATION...|    CHIEF EXECUTIVES|REGIONAL PRESIDEN...|                 Y|         220314|2016|    DENVER, COLORADO| -104.990251|39.7392358|
|  5|          WITHDRAWN|PEABODY INVESTMEN...|    CHIEF EXECUTIVES|PRESIDENT MONGOLI...|                 Y|       157518.4|2016| ST. LOUIS, MISSOURI| -90.1994042|38.6270025|
|  6|CERTIFIED-WITHDRAWN|BURGER KING CORPO...|    CHIEF EXECUTIVES|EXECUTIVE V P, GL...|                 Y|         225000|2016|      MIAMI, FLORIDA| -80.1917902|25.7616798|
|  7|CERTIFIED-WITHDRAWN|BT AND MK ENERGY ...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|          91021|2016|      HOUSTON, TEXAS| -95.3698028|29.7604267|
|  8|CERTIFIED-WITHDRAWN|GLOBO MOBILE TECH...|    CHIEF EXECUTIVES|CHIEF OPERATIONS ...|                 Y|         150000|2016|SAN JOSE, CALIFORNIA|-121.8863286|37.3382082|
|  9|CERTIFIED-WITHDRAWN|  ESI COMPANIES INC.|    CHIEF EXECUTIVES|           PRESIDENT|                 Y|         127546|2016|      MEMPHIS, TEXAS|          NA|        NA|
| 10|          WITHDRAWN|LESSARD INTERNATI...|    CHIEF EXECUTIVES|           PRESIDENT|                 Y|         154648|2016|    VIENNA, VIRGINIA| -77.2652604|38.9012225|
| 11|CERTIFIED-WITHDRAWN|  H.J. HEINZ COMPANY|    CHIEF EXECUTIVES|CHIEF INFORMATION...|                 Y|         182978|2016|PITTSBURGH, PENNS...| -79.9958864|40.4406248|
| 12|CERTIFIED-WITHDRAWN|DOW CORNING CORPO...|    CHIEF EXECUTIVES|VICE PRESIDENT AN...|                 Y|         163717|2016|   MIDLAND, MICHIGAN| -84.2472116|43.6155825|
| 13|CERTIFIED-WITHDRAWN|    ACUSHNET COMPANY|    CHIEF EXECUTIVES|   TREASURER AND COO|                 Y|       203860.8|2016|FAIRHAVEN, MASSAC...|          NA|        NA|
| 14|CERTIFIED-WITHDRAWN|       BIOCAIR, INC.|    CHIEF EXECUTIVES|CHIEF COMMERCIAL ...|                 Y|         252637|2016|      MIAMI, FLORIDA| -80.1917902|25.7616798|
| 15|CERTIFIED-WITHDRAWN|NEWMONT MINING CO...|    CHIEF EXECUTIVES|        BOARD MEMBER|                 Y|         105914|2016|GREENWOOD VILLAGE...|-104.9508141|39.6172101|
| 16|CERTIFIED-WITHDRAWN|        VRICON, INC.|    CHIEF EXECUTIVES|CHIEF FINANCIAL O...|                 Y|         153046|2016|  STERLING, VIRGINIA| -77.4291298|39.0066993|
| 17|CERTIFIED-WITHDRAWN|CARDIAC SCIENCE C...|  FINANCIAL MANAGERS|VICE PRESIDENT OF...|                 Y|          90834|2016| WAUKESHA, WISCONSIN| -88.2314813|43.0116784|
| 18|CERTIFIED-WITHDRAWN|WESTFIELD CORPORA...|    CHIEF EXECUTIVES|GENERAL MANAGER, ...|                 Y|         164050|2016|LOS ANGELES, CALI...|-118.2436849|34.0522342|
| 19|          CERTIFIED|      QUICKLOGIX LLC|    CHIEF EXECUTIVES|                 CEO|                 Y|         187200|2016|SANTA CLARA, CALI...|-121.9552356|37.3541079|
| 20|          CERTIFIED|MCCHRYSTAL GROUP,...|    CHIEF EXECUTIVES|PRESIDENT, NORTHE...|                 Y|         241842|2016|ALEXANDRIA, VIRGINIA| -77.0469214|38.8048355|
+---+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+
only showing top 20 rows


In [7]:
df_data_1.groupBy('SOC_NAME').count().show()


+--------------------+-----+
|            SOC_NAME|count|
+--------------------+-----+
|Software Develope...|  920|
|Food Service Mana...| 1242|
|Computer Hardware...|   17|
|Human Resources A...|   94|
|COMPUTER & INFORM...|    6|
|FOOD SERVICE MANA...|  370|
|PROPERTY, REAL ES...|    2|
|AGENTS AND BUSINE...|   96|
|     COST ESTIMATORS| 1451|
|SOFTWARE QUALITY ...|    3|
|CHEMICAL TECHNICIANS|  173|
|CAREER/TECHNICAL ...|    3|
|INSTRUCTIONAL COO...| 1145|
|MUSIC DIRECTORS A...|  138|
|INDUSTRIAL DESIGNERS|    1|
|NETWORK SECURITY ...|    1|
|AIRCRAFT CARGO HA...|    1|
|Vocational Educat...|    3|
|Property, Real Es...|  283|
|  Recreation Workers|  342|
+--------------------+-----+
only showing top 20 rows


In [8]:
test = df_data_1.rdd.map(lambda s: str(s).lower())

Here, we lowercase everything in the dataframe


In [9]:
test.take(5)


Out[9]:
["row(_c0=1, case_status=u'certified-withdrawn', employer_name=u'university of michigan', soc_name=u'biochemists and biophysicists', job_title=u'postdoctoral research fellow', full_time_position=u'n', prevailing_wage=u'36067', year=u'2016', worksite=u'ann arbor, michigan', lon=u'-83.7430378', lat=u'42.2808256')",
 "row(_c0=2, case_status=u'certified-withdrawn', employer_name=u'goodman networks, inc.', soc_name=u'chief executives', job_title=u'chief operating officer', full_time_position=u'y', prevailing_wage=u'242674', year=u'2016', worksite=u'plano, texas', lon=u'-96.6988856', lat=u'33.0198431')",
 "row(_c0=3, case_status=u'certified-withdrawn', employer_name=u'ports america group, inc.', soc_name=u'chief executives', job_title=u'chief process officer', full_time_position=u'y', prevailing_wage=u'193066', year=u'2016', worksite=u'jersey city, new jersey', lon=u'-74.0776417', lat=u'40.7281575')",
 "row(_c0=4, case_status=u'certified-withdrawn', employer_name=u'gates corporation, a wholly-owned subsidiary of tomkins plc', soc_name=u'chief executives', job_title=u'regional presiden, americas', full_time_position=u'y', prevailing_wage=u'220314', year=u'2016', worksite=u'denver, colorado', lon=u'-104.990251', lat=u'39.7392358')",
 "row(_c0=5, case_status=u'withdrawn', employer_name=u'peabody investments corp.', soc_name=u'chief executives', job_title=u'president mongolia and india', full_time_position=u'y', prevailing_wage=u'157518.4', year=u'2016', worksite=u'st. louis, missouri', lon=u'-90.1994042', lat=u'38.6270025')"]

In [16]:
test.groupBy('soc_name')



Py4JJavaErrorTraceback (most recent call last)
<ipython-input-16-bb0317f3b541> in <module>()
----> 1 test.groupBy('soc_name').take(5)

/usr/local/src/spark20master/spark/python/pyspark/rdd.py in take(self, num)
   1308 
   1309             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1310             res = self.context.runJob(self, takeUpToNumLeft, p)
   1311 
   1312             items += res

/usr/local/src/spark20master/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    931         # SparkContext#runJob.
    932         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    934         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    935 

/usr/local/src/spark20master/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/src/spark20master/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/src/spark20master/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 10 times, most recent failure: Lost task 0.9 in stage 13.0 (TID 66, yp-spark-dal09-env5-0042): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 2371, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 2371, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 1876, in combine
    merger.mergeValues(iterator)
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 701, in <lambda>
    return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
TypeError: 'str' object is not callable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:322)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:322)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(Thread.java:785)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1461)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1449)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1448)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1448)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:812)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:812)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:812)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1674)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1629)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at java.lang.Thread.getStackTrace(Thread.java:1117)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1887)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1900)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
	at java.lang.reflect.Method.invoke(Method.java:507)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 2371, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 2371, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 1876, in combine
    merger.mergeValues(iterator)
  File "/usr/local/src/spark20master/spark-2.0.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "/usr/local/src/spark20master/spark/python/pyspark/rdd.py", line 701, in <lambda>
    return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
TypeError: 'str' object is not callable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:322)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:322)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more

In [11]:
rdd_data_1=df_data_1.rdd

In [12]:
newdata=rdd_data_1.map(lambda x: x.lower())